package drds.plus.executor.transaction.cobar_style;

import drds.plus.datanode.api.Connection;
import drds.plus.datanode.api.DatasourceManager;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.transaction.AbstractTransaction;
import drds.plus.executor.transaction.strict_write_with_non_transaction_cross_database_read.ReadWrite;

import java.sql.SQLException;
import java.util.Set;

/**
 * 支持跨库事务,针对每建立的连接都开启事务 connection.setAutoCommit(false);
 */
public class CobarStyleTransaction extends AbstractTransaction {

    private drds.plus.executor.transaction.ConnectionHolder connectionHolder = null;

    public CobarStyleTransaction(ExecuteContext executeContext) {
        super(executeContext);

        connectionHolder = new CobarStyleConnectionHolder();
    }

    public Connection getConnection(String dataNodeId, DatasourceManager datasourceManager, ReadWrite readWrite) throws SQLException {
        if (dataNodeId == null) {
            throw new IllegalArgumentException("datanode columnName is null");
        }

        lock.lock();

        try {

            Connection connection = this.getConnectionHolder().getConnection(dataNodeId, datasourceManager);
            connection.setAutoCommit(false);
            return connection;
        } finally {
            lock.unlock();
        }
    }

    public void commit() throws RuntimeException {

        lock.lock();

        try {

            Set<Connection> connectionSet = this.getConnectionHolder().getConnectionSet();

            for (Connection connection : connectionSet) {
                try {
                    connection.commit();
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }

            close();
        } finally {
            lock.unlock();
        }
    }

    public void rollback() throws RuntimeException {

        lock.lock();

        try {
            Set<Connection> connectionSet = this.getConnectionHolder().getConnectionSet();

            for (Connection connection : connectionSet) {
                try {
                    connection.rollback();
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }

            close();
        } finally {
            lock.unlock();
        }
    }

    public void tryClose(String dataNodeId, Connection connection) throws SQLException {
        lock.lock();

        try {
            this.getConnectionHolder().tryClose(dataNodeId, connection);
        } finally {
            lock.unlock();
        }
    }

    public drds.plus.executor.transaction.ConnectionHolder getConnectionHolder() {
        return this.connectionHolder;
    }

    public void close() {

        if (isClosed()) {
            return;
        }
        lock.lock();

        try {
            super.close();

            this.getConnectionHolder().closeConnectionSet();
        } finally {
            lock.unlock();
        }
    }

    public void tryClose() throws SQLException {

    }

}
